flink

您所在的位置:网站首页 flink 状态定时清理 flink

flink

2023-06-10 04:39| 来源: 网络整理| 查看: 265

import org.apache.flink.api.common.restartstrategy.RestartStrategies

import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup

import org.apache.flink.streaming.api.{CheckpointingMode }

import org.apache.flink.streaming.api.scala._

object ProcessFunctionTest {

def main(args: Array[String]): Unit = {

// 这个是流的检查点

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

//每隔多久Checkpointing 60000ms

env.enableCheckpointing(60000) // 特别注意 如果是测试的话,就1000ms测试看看效果,省的等待

//状态一致性的级别至少一次 默认是精确一次,后面的章节会具体讲解

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

//Checkpoint保存到状态后端 如果因网络延迟等问题的话,超时100000ms=100s 就不做Checkpoint

env.getCheckpointConfig.setCheckpointTimeout(100000)

//true 表示如果在做Checkpointing的时候出现了失败,就把整个job给取消掉,false就是不取消

env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

/*

设置Checkpoints的最大有几个可以并行,当然和setMinPauseBetweenCheckpoints(..)一起设置有点冲突。

注意 Checkpointing可能有先后 但是一个远行Checkpointing可能会好久,所以 很可能会出现某个时间段的并行

*/

env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

//设置每隔多久Checkpoints,当然和setMaxConcurrentCheckpoints(..)一起设置有点冲突。

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)

/*

DELETE_ON_CANCELLATION表示手动取消的话job,就会删除Checkpoint。

RETAIN_ON_CANCELLATION表示手动取消job人物的话,就不删除Checkpoint。

如果是自动的job任务是失败,Checkpoint是默认保存的

*/

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

//如果job失败了,当重启的话,最大3次可以重启,每次重启间隔300ms

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,300) )

//如果job失败了,当重启的话,300s内最大3次可以重启,且每次的间隔为10s,这里的时间单位和收上面的不一样

env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))

env.execute("test")

}

}



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3